<?php
namespace Swork\Queue;

use Swork\Client\Redis;
use Swork\Service;

/**
 * Redis实现的队列
 * Class RedisQueue
 * @package Swork\Queue
 */
class RedisQueue implements QueueInterface
{
    /**
     * 收集器
     * @var Redis
     */
    private static $redis = null;

    /**
     * 监控KEY
     * @var string
     */
    private $mkey = 'swork_queue_key';

    /**
     * Redis入口
     * @return Redis
     */
    public function __construct()
    {
        if (self::$redis == null)
        {
            self::$redis = new Redis();
            $this->createTimer();
        }
        return self::$redis;
    }

    /**
     * 创建定时器
     */
    private function createTimer()
    {
        $time = time();
        $having = false;
        $ticker = Service::$server->table->get($this->mkey, 'data');
        if ($ticker != false)
        {
            if (($time - $ticker) < 10)
            {
                $having = true;
            }
        }

        //如果没有定时器
        if ($having == false)
        {
            //创建定时器
            $args = [5000, static::class, 'reset'];
            if (Service::$server->worker_id == 0)
            {
                Service::$taskManager->addTask(...$args);
            }
            else
            {
                $info = [
                    'act' => 'AddTask',
                    'args' => $args
                ];
                Service::$server->sendMessage(serialize($info), 0);
            }

            //记录添加时间
            Service::$server->table->set($this->mkey, ['data' => $time]);
        }
    }

    /**
     * 根据KEY压入队列
     * @param string $key 队列KEY
     * @param array $data 应用数据（array数组）
     * @param int $rtime 队列恢复时间（单位：分钟）
     */
    public function push(string $key, array $data, int $rtime = 1)
    {
        //压入队列
        self::$redis->lPush($key, serialize($data));

        //加入KEY监控表
        $time = self::$redis->hGet($this->mkey, $key);
        if ($time == false || $time != $rtime)
        {
            self::$redis->hSet($this->mkey, $key, $rtime);
        }
    }

    /**
     * 根据KEY提取队列
     * @param string $key 队列KEY
     * @return array|bool
     */
    public function pop(string $key)
    {
        //从队列中提出来
        $val = self::$redis->rPop($key);
        if ($val == false)
        {
            return false;
        }

        //内容值KEY，如果存在，则不再处理
        $filed = md5($val);
        $hkey = $this->getHashKey($key);
        if (self::$redis->hExists($hkey, $filed))
        {
            return $this->pop($key);
        }

        //临时储存至队列的Hash表
        $data = unserialize($val);
        $value = [
            'data' => $data,
            'time' => time()
        ];
        self::$redis->hSet($hkey, $filed, serialize($value));

        //返回
        return [
            'id' => $filed,
            'data' => $data
        ];
    }

    /**
     * 确认消费队列完成
     * @param string $key 队列KEY
     * @param string $id 消息ID
     */
    public function ack(string $key, string $id)
    {
        self::$redis->hDel($this->getHashKey($key), $id);
    }

    /**
     * 重置队列
     */
    public function reset()
    {
        //更新Tick时间
        Service::$server->table->set($this->mkey, ['data' => time()]);

        //获取所有队列KEY
        $keys = self::$redis->hGetAll($this->mkey);
        if ($keys == false)
        {
            return;
        }
        foreach ($keys as $key => $rtime)
        {
            $hkey = $this->getHashKey($key);
            $items = self::$redis->hGetAll($hkey);
            if ($items == false)
            {
                continue;
            }
            $rtime *= 60;
            foreach ($items as $id => $item)
            {
                $item = unserialize($item);
                $time = $item['time'] ?? 0;
                if ($time < 0 || ($time + $rtime) < time())
                {
                    self::$redis->hDel($hkey, $id);
                    $this->push($key, $item['data'], $rtime);
                }
            }
        }
    }

    /**
     * 获取hash队列key
     * @param $key
     * @return string
     */
    private function getHashKey($key)
    {
        return "swork_queue_$key";
    }
}
